/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io.gcp.bigquery;



import com.google.api.services.bigquery.model.TableFieldSchema;

import com.google.api.services.bigquery.model.TableRow;

import com.google.api.services.bigquery.model.TableSchema;

import com.google.auto.value.AutoValue;

import org.apache.avro.generic.GenericRecord;

import com.bff.gaia.unified.sdk.schemas.Schema;

import com.bff.gaia.unified.sdk.schemas.Schema.Field;

import com.bff.gaia.unified.sdk.schemas.Schema.FieldType;

import com.bff.gaia.unified.sdk.schemas.Schema.TypeName;

import com.bff.gaia.unified.sdk.transforms.SerializableFunction;

import com.bff.gaia.unified.sdk.transforms.SerializableFunctions;

import com.bff.gaia.unified.sdk.values.Row;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableMap;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableSet;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.Lists;

import com.bff.gaia.unified.vendor.guava.com.google.common.io.BaseEncoding;

import org.joda.time.DateTime;

import org.joda.time.Instant;

import org.joda.time.ReadableInstant;

import org.joda.time.chrono.ISOChronology;

import org.joda.time.format.DateTimeFormatter;

import org.joda.time.format.DateTimeFormatterBuilder;



import java.io.Serializable;

import java.math.BigDecimal;

import java.nio.ByteBuffer;

import java.util.ArrayList;

import java.util.List;

import java.util.Map;

import java.util.Set;

import java.util.function.Function;

import java.util.stream.IntStream;



import static java.util.stream.Collectors.toList;

import static java.util.stream.Collectors.toMap;

import static com.bff.gaia.unified.sdk.values.Row.toRow;



/** Utility methods for BigQuery related operations. */

public class BigQueryUtils {



  /** Options for how to convert BigQuery data to Unified data. */

  @AutoValue

  public abstract static class ConversionOptions implements Serializable {



    /**

     * Controls whether to truncate timestamps to millisecond precision lossily, or to crash when

     * truncation would result.

     */

    public enum TruncateTimestamps {

      /** Reject timestamps with greater-than-millisecond precision. */

      REJECT,



      /** Truncate timestamps to millisecond precision. */

      TRUNCATE;

    }



    public abstract TruncateTimestamps getTruncateTimestamps();



    public static Builder builder() {

      return new AutoValue_BigQueryUtils_ConversionOptions.Builder()

          .setTruncateTimestamps(TruncateTimestamps.REJECT);

    }



    /** Builder for {@link ConversionOptions}. */

    @AutoValue.Builder

    public abstract static class Builder {

      public abstract Builder setTruncateTimestamps(TruncateTimestamps truncateTimestamps);



      public abstract ConversionOptions build();

    }

  }



  private static final Map<TypeName, StandardSQLTypeName> BEAM_TO_BIGQUERY_TYPE_MAPPING =

      ImmutableMap.<TypeName, StandardSQLTypeName>builder()

          .put(TypeName.BYTE, StandardSQLTypeName.INT64)

          .put(TypeName.INT16, StandardSQLTypeName.INT64)

          .put(TypeName.INT32, StandardSQLTypeName.INT64)

          .put(TypeName.INT64, StandardSQLTypeName.INT64)

          .put(TypeName.FLOAT, StandardSQLTypeName.FLOAT64)

          .put(TypeName.DOUBLE, StandardSQLTypeName.FLOAT64)

          .put(TypeName.DECIMAL, StandardSQLTypeName.NUMERIC)

          .put(TypeName.BOOLEAN, StandardSQLTypeName.BOOL)

          .put(TypeName.ARRAY, StandardSQLTypeName.ARRAY)

          .put(TypeName.ROW, StandardSQLTypeName.STRUCT)

          .put(TypeName.DATETIME, StandardSQLTypeName.TIMESTAMP)

          .put(TypeName.STRING, StandardSQLTypeName.STRING)

          .put(TypeName.BYTES, StandardSQLTypeName.BYTES)

          .build();



  private static final Map<TypeName, Function<String, Object>> JSON_VALUE_PARSERS =

      ImmutableMap.<TypeName, Function<String, Object>>builder()

          .put(TypeName.BYTE, Byte::valueOf)

          .put(TypeName.INT16, Short::valueOf)

          .put(TypeName.INT32, Integer::valueOf)

          .put(TypeName.INT64, Long::valueOf)

          .put(TypeName.FLOAT, Float::valueOf)

          .put(TypeName.DOUBLE, Double::valueOf)

          .put(TypeName.DECIMAL, BigDecimal::new)

          .put(TypeName.BOOLEAN, Boolean::valueOf)

          .put(TypeName.STRING, str -> str)

          .put(

              TypeName.DATETIME,

              str ->

                  new DateTime(

                      (long) (Double.parseDouble(str) * 1000), ISOChronology.getInstanceUTC()))

          .build();



  // TODO: BigQuery code should not be relying on Calcite metadata fields. If so, this belongs

  // in the SQL package.

  private static final Map<String, StandardSQLTypeName> BEAM_TO_BIGQUERY_LOGICAL_MAPPING =

      ImmutableMap.<String, StandardSQLTypeName>builder()

          .put("SqlDateType", StandardSQLTypeName.DATE)

          .put("SqlTimeType", StandardSQLTypeName.TIME)

          .put("SqlTimeWithLocalTzType", StandardSQLTypeName.TIME)

          .put("SqlTimestampWithLocalTzType", StandardSQLTypeName.TIMESTAMP)

          .put("SqlCharType", StandardSQLTypeName.STRING)

          .build();



  /**

   * Get the corresponding BigQuery {@link StandardSQLTypeName} for supported Unified {@link

   * FieldType}.

   */

  private static StandardSQLTypeName toStandardSQLTypeName(FieldType fieldType) {

    if (fieldType.getTypeName().isLogicalType()) {

      StandardSQLTypeName foundType =

          BEAM_TO_BIGQUERY_LOGICAL_MAPPING.get(fieldType.getLogicalType().getIdentifier());

      if (foundType != null) {

        return foundType;

      }

    }

    return BEAM_TO_BIGQUERY_TYPE_MAPPING.get(fieldType.getTypeName());

  }



  private static List<TableFieldSchema> toTableFieldSchema(Schema schema) {

    List<TableFieldSchema> fields = new ArrayList<>(schema.getFieldCount());

    for (Field schemaField : schema.getFields()) {

      FieldType type = schemaField.getType();



      TableFieldSchema field = new TableFieldSchema().setName(schemaField.getName());

      if (schemaField.getDescription() != null && !"".equals(schemaField.getDescription())) {

        field.setDescription(schemaField.getDescription());

      }



      if (!schemaField.getType().getNullable()) {

        field.setMode(Mode.REQUIRED.toString());

      }

      if (TypeName.ARRAY == type.getTypeName()) {

        type = type.getCollectionElementType();

        if (type.getTypeName().isCollectionType() || type.getTypeName().isMapType()) {

          throw new IllegalArgumentException("Array of collection is not supported in BigQuery.");

        }

        field.setMode(Mode.REPEATED.toString());

      }

      if (TypeName.ROW == type.getTypeName()) {

        Schema subType = type.getRowSchema();

        field.setFields(toTableFieldSchema(subType));

      }

      if (TypeName.MAP == type.getTypeName()) {

        throw new IllegalArgumentException("Maps are not supported in BigQuery.");

      }

      field.setType(toStandardSQLTypeName(type).toString());



      fields.add(field);

    }

    return fields;

  }



  /** Convert a Unified {@link Schema} to a BigQuery {@link TableSchema}. */

  public static TableSchema toTableSchema(Schema schema) {

    return new TableSchema().setFields(toTableFieldSchema(schema));

  }



  private static final SerializableFunction<Row, TableRow> ROW_TO_TABLE_ROW =

      new ToTableRow(SerializableFunctions.identity());



  /** Convert a Unified {@link Row} to a BigQuery {@link TableRow}. */

  public static SerializableFunction<Row, TableRow> toTableRow() {

    return ROW_TO_TABLE_ROW;

  }



  /** Convert a Unified schema type to a BigQuery {@link TableRow}. */

  public static <T> SerializableFunction<T, TableRow> toTableRow(

      SerializableFunction<T, Row> toRow) {

    return new ToTableRow<>(toRow);

  }



  /** Convert a Unified {@link Row} to a BigQuery {@link TableRow}. */

  private static class ToTableRow<T> implements SerializableFunction<T, TableRow> {

    private final SerializableFunction<T, Row> toRow;



    ToTableRow(SerializableFunction<T, Row> toRow) {

      this.toRow = toRow;

    }



    @Override

    public TableRow apply(T input) {

      return toTableRow(toRow.apply(input));

    }

  }



  public static Row toUnifiedRow(GenericRecord record, Schema schema, ConversionOptions options) {

    List<Object> valuesInOrder =

        schema.getFields().stream()

            .map(field -> convertAvroFormat(field, record.get(field.getName()), options))

            .collect(toList());



    return Row.withSchema(schema).addValues(valuesInOrder).build();

  }



  /** Convert a BigQuery TableRow to a Unified Row. */

  public static TableRow toTableRow(Row row) {

    TableRow output = new TableRow();

    for (int i = 0; i < row.getFieldCount(); i++) {

      Object value = row.getValue(i);

      Field schemaField = row.getSchema().getField(i);

      output = output.set(schemaField.getName(), fromUnifiedField(schemaField.getType(), value));

    }

    return output;

  }



  private static Object fromUnifiedField(FieldType fieldType, Object fieldValue) {

    if (fieldValue == null) {

      if (!fieldType.getNullable()) {

        throw new IllegalArgumentException("Field is not nullable.");

      }

      return null;

    }



    switch (fieldType.getTypeName()) {

      case ARRAY:

        FieldType elementType = fieldType.getCollectionElementType();

        List items = (List) fieldValue;

        List convertedItems = Lists.newArrayListWithCapacity(items.size());

        for (Object item : items) {

          convertedItems.add(fromUnifiedField(elementType, item));

        }

        return convertedItems;



      case ROW:

        return toTableRow((Row) fieldValue);



      case DATETIME:

        DateTimeFormatter patternFormat =

            new DateTimeFormatterBuilder()

                .appendPattern("yyyy-MM-dd'T'HH:mm:ss.SSSZZ")

                .toFormatter();

        return ((Instant) fieldValue).toDateTime().toString(patternFormat);



      case INT16:

      case INT32:

      case INT64:

      case FLOAT:

      case DOUBLE:

      case STRING:

      case BOOLEAN:

        return fieldValue.toString();



      case DECIMAL:

        return fieldValue.toString();



      case BYTES:

        ByteBuffer byteBuffer = (ByteBuffer) fieldValue;

        byte[] bytes = new byte[byteBuffer.limit()];

        byteBuffer.get(bytes);

        return BaseEncoding.base64().encode(bytes);



      default:

        return fieldValue;

    }

  }



  /**

   * Tries to parse the JSON {@link TableRow} from BigQuery.

   *

   * <p>Only supports basic types and arrays. Doesn't support date types.

   */

  public static Row toUnifiedRow(Schema rowSchema, TableSchema bqSchema, TableRow jsonBqRow) {

    List<TableFieldSchema> bqFields = bqSchema.getFields();



    Map<String, Integer> bqFieldIndices =

        IntStream.range(0, bqFields.size())

            .boxed()

            .collect(toMap(i -> bqFields.get(i).getName(), i -> i));



    List<Object> rawJsonValues =

        rowSchema.getFields().stream()

            .map(field -> bqFieldIndices.get(field.getName()))

            .map(index -> jsonBqRow.getF().get(index).getV())

            .collect(toList());



    return IntStream.range(0, rowSchema.getFieldCount())

        .boxed()

        .map(index -> toUnifiedValue(rowSchema.getField(index).getType(), rawJsonValues.get(index)))

        .collect(toRow(rowSchema));

  }



  private static Object toUnifiedValue(FieldType fieldType, Object jsonBQValue) {

    if (jsonBQValue instanceof String && JSON_VALUE_PARSERS.containsKey(fieldType.getTypeName())) {

      return JSON_VALUE_PARSERS.get(fieldType.getTypeName()).apply((String) jsonBQValue);

    }



    if (jsonBQValue instanceof List) {

      return ((List<Object>) jsonBQValue)

          .stream()

              .map(v -> ((Map<String, Object>) v).get("v"))

              .map(v -> toUnifiedValue(fieldType.getCollectionElementType(), v))

              .collect(toList());

    }



    throw new UnsupportedOperationException(

        "Converting BigQuery type '"

            + jsonBQValue.getClass()

            + "' to '"

            + fieldType

            + "' is not supported");

  }



  // TODO: BigQuery shouldn't know about SQL internal logical types.

  private static final Set<String> SQL_DATE_TIME_TYPES =

      ImmutableSet.of(

          "SqlDateType", "SqlTimeType", "SqlTimeWithLocalTzType", "SqlTimestampWithLocalTzType");

  private static final Set<String> SQL_STRING_TYPES = ImmutableSet.of("SqlCharType");



  /**

   * Tries to convert an Avro decoded value to a Unified field value based on the target type of the

   * Unified field.

   */

  public static Object convertAvroFormat(

	  Field unifiedField, Object avroValue, BigQueryUtils.ConversionOptions options) {

    TypeName unifiedFieldTypeName = unifiedField.getType().getTypeName();

    switch (unifiedFieldTypeName) {

      case INT16:

      case INT32:

      case INT64:

      case FLOAT:

      case DOUBLE:

      case BYTE:

      case BOOLEAN:

        return convertAvroPrimitiveTypes(unifiedFieldTypeName, avroValue);

      case DATETIME:

        // Expecting value in microseconds.

        switch (options.getTruncateTimestamps()) {

          case TRUNCATE:

            return truncateToMillis(avroValue);

          case REJECT:

            return safeToMillis(avroValue);

          default:

            throw new IllegalArgumentException(

                String.format(

                    "Unknown timestamp truncation option: %s", options.getTruncateTimestamps()));

        }

      case STRING:

        return convertAvroPrimitiveTypes(unifiedFieldTypeName, avroValue);

      case ARRAY:

        return convertAvroArray(unifiedField, avroValue);

      case LOGICAL_TYPE:

        String identifier = unifiedField.getType().getLogicalType().getIdentifier();

        if (SQL_DATE_TIME_TYPES.contains(identifier)) {

          switch (options.getTruncateTimestamps()) {

            case TRUNCATE:

              return truncateToMillis(avroValue);

            case REJECT:

              return safeToMillis(avroValue);

            default:

              throw new IllegalArgumentException(

                  String.format(

                      "Unknown timestamp truncation option: %s", options.getTruncateTimestamps()));

          }

        } else if (SQL_STRING_TYPES.contains(identifier)) {

          return convertAvroPrimitiveTypes(TypeName.STRING, avroValue);

        } else {

          throw new RuntimeException("Unknown logical type " + identifier);

        }

      case DECIMAL:

        throw new RuntimeException("Does not support converting DECIMAL type value");

      case MAP:

        throw new RuntimeException("Does not support converting MAP type value");

      default:

        throw new RuntimeException(

            "Does not support converting unknown type value: " + unifiedFieldTypeName);

    }

  }



  private static ReadableInstant safeToMillis(Object value) {

    long subMilliPrecision = ((long) value) % 1000;

    if (subMilliPrecision != 0) {

      throw new IllegalArgumentException(

          String.format(

              "BigQuery data contained value %s with sub-millisecond precision, which Unified does"

                  + " not currently support."

                  + " You can enable truncating timestamps to millisecond precision"

                  + " by using BigQueryIO.withTruncatedTimestamps",

              value));

    } else {

      return truncateToMillis(value);

    }

  }



  private static ReadableInstant truncateToMillis(Object value) {

    return new Instant((long) value / 1000);

  }



  private static Object convertAvroArray(Field unifiedField, Object value) {

    // Check whether the type of array element is equal.

    List<Object> values = (List<Object>) value;

    List<Object> ret = new ArrayList();

    for (Object v : values) {

      ret.add(

          convertAvroPrimitiveTypes(

              unifiedField.getType().getCollectionElementType().getTypeName(), v));

    }

    return (Object) ret;

  }



  private static Object convertAvroString(Object value) {

    if (value == null) {

      return null;

    } else if (value instanceof org.apache.avro.util.Utf8) {

      return ((org.apache.avro.util.Utf8) value).toString();

    } else if (value instanceof String) {

      return value;

    } else {

      throw new RuntimeException(

          "Does not support converting avro format: " + value.getClass().getName());

    }

  }



  private static Object convertAvroPrimitiveTypes(TypeName unifiedType, Object value) {

    switch (unifiedType) {

      case BYTE:

        return ((Long) value).byteValue();

      case INT16:

        return ((Long) value).shortValue();

      case INT32:

        return ((Long) value).intValue();

      case INT64:

        return value;

      case FLOAT:

        return ((Double) value).floatValue();

      case DOUBLE:

        return (Double) value;

      case BOOLEAN:

        return (Boolean) value;

      case DECIMAL:

        throw new RuntimeException("Does not support converting DECIMAL type value");

      case STRING:

        return convertAvroString(value);

      default:

        throw new RuntimeException(unifiedType + " is not primitive type.");

    }

  }

}